Source code for hysop.core.graph.graph_builder

# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import numpy as np

from hysop import vprint, dprint, Problem
from hysop.fields.continuous_field import ScalarField
from hysop.tools.htypes import check_instance, first_not_None
from hysop.tools.io_utils import IOParams

from hysop.tools.transposition_states import TranspositionState
from hysop.constants import MemoryOrdering, Backend

from hysop.parameters.parameter import Parameter
from hysop.topology.cartesian_topology import CartesianTopologyState

from hysop.core.graph.graph import (
    new_directed_graph,
    new_vertex,
    new_edge,
    is_directed_acyclic_graph,
    transitive_reduction,
    lexicographical_topological_sort,
    all_simple_paths,
)
from hysop.core.graph.computational_graph import ComputationalGraph
from hysop.core.graph.computational_node import ComputationalGraphNode
from hysop.core.graph.computational_operator import ComputationalGraphOperator

from hysop.fields.field_requirements import (
    DiscreteFieldRequirements,
    MultiFieldRequirements,
)

from hysop.operator.redistribute import (
    Redistribute,
    RedistributeInter,
    RedistributeInterParam,
    RedistributeNotImplementedError,
)
from hysop.operator.transpose import Transpose, TranspositionNotImplementedError
from hysop.operator.memory_reordering import (
    MemoryReordering,
    MemoryReorderingNotImplementedError,
)
from hysop.tools.sympy_utils import subscript

# Debug level for graph building
#   0: no debug logs
#   1: print debug info about input/outputs and operator generation
#   2: print debug info about topology states in addition
GRAPH_BUILDER_DEBUG_LEVEL = 0


[docs] def gprint(*args, **kwds): level = kwds.pop("level", 1) if GRAPH_BUILDER_DEBUG_LEVEL >= level: print(*args, **kwds)
[docs] def gprint2(*args, **kwds): kwds["level"] = 2 gprint(*args, **kwds)
[docs] class GraphBuilder: """ Helper class to build graph of computational operators (see ComputationalGraph). """ def __init__(self, node): """ Initialize a GraphBuilder with given target ComputationalGraph. Parameters ---------- node: hysop.core.graph.ComputationalGraph target node that should generate its graph Notes ----- The following class is a helper for ComputationalGraph graph building step. It should not be used outside of ComputationalGraph. """ check_instance(node, ComputationalGraph) self.target_node = node gprint("::Graph builder::") gprint(f">Initialized graph builder for ComputationalGraph {node.name}")
[docs] def configure( self, current_level, outputs_are_inputs, search_intertasks_ops, **kwds ): check_instance(current_level, int) check_instance(outputs_are_inputs, bool) check_instance(search_intertasks_ops, bool) self.current_level = current_level self.outputs_are_inputs = outputs_are_inputs self.search_intertasks_ops = search_intertasks_ops self.setup_graph() self.setup_variables() msg = ">Builder configuration: current_level={}, outputs_are_inputs={}, search_intertasks_ops={}, **kwds={}" msg = msg.format(current_level, outputs_are_inputs, search_intertasks_ops, kwds) gprint(msg)
[docs] def setup_graph(self): self.graph = new_directed_graph()
[docs] def setup_variables(self): self.input_fields = {} self.output_fields = {} self.input_params = {} self.output_params = {} self.topology_states = {} self.input_topology_states = {} self.output_topology_states = {} self.op_input_topology_states = {} self.op_output_topology_states = {}
[docs] def new_topology_state(self, field): return self.__ContinuousFieldState( field, self.op_input_topology_states, self.op_output_topology_states, self.target_node._input_fields_to_dump, self.target_node.method, )
[docs] def new_node( self, op, subgraph, current_level, node, node_id, opvertex, insert_at=-1 ): # Adds new nodes to graph. Optional position of new nodes in nodelist (adjust node_id for nodes sorting) graph = self.graph opnode = new_vertex(graph, op).copy_attributes(opvertex) gprint(f" *Created node is {int(opnode)}." + op.name) if insert_at >= 0: for _ in graph.nodes: if _.node_id > insert_at: _.node_id += 1 opnode.node_id = insert_at + 1 gprint(f" *Created moved to {int(opnode)}.") return opnode
[docs] def build_graph(self): target_node = self.target_node current_level = self.current_level outputs_are_inputs = self.outputs_are_inputs graph = self.graph parameter_handler = self.__ParameterHandler(graph) input_fields = self.input_fields output_fields = self.output_fields input_params = self.input_params output_params = self.output_params input_topology_states = self.input_topology_states output_topology_states = self.output_topology_states op_input_topology_states = self.op_input_topology_states op_output_topology_states = self.op_output_topology_states self._deferred_operators = [] self._double_check_inputs = {} self._redistribute_inter = [] self._intertasks_exchanged = set() # check that all target nodes are unique to prevent conflicts if len(set(target_node.nodes)) != len(target_node.nodes): duplicates = { x for x in target_node.nodes if target_node.nodes.count(x) > 1 } msg = ( "\n\nFATAL ERROR: ComputationalGraph {} contains mutiple references to " ) msg += "the same nodes.\n" msg += "Concerned operators are:\n" for op in duplicates: msg0 = " *Operator {:12s} (cls={:30s} | id={}): {} occurences\n" msg0 = msg0.format( op.name, type(op).__name__, id(op), target_node.nodes.count(op) ) msg += msg0 msg = msg.format(target_node.name) raise RuntimeError(msg) def __handle_node( node_id, node, subgraph, node_ops, node_vertices, from_subgraph, opvertex, op, opnode, ): gprint(f" *{op.name} ({type(op)})") opname = op.name oppname = op.pretty_name iparams = op.input_params oparams = op.output_params ifields = op.input_fields ofields = op.output_fields field_requirements = op._field_requirements if field_requirements is None: op.get_and_set_field_requirements() field_requirements = op._field_requirements if isinstance(op, RedistributeInter) or isinstance( op, RedistributeInterParam ): self._intertasks_exchanged = self._intertasks_exchanged.union( {_.name for _ in list(op.output_fields) + list(output_params)} ) self._intertasks_exchanged = self._intertasks_exchanged.union( {_.name for _ in list(op.input_fields) + list(op.input_params)} ) if not isinstance(op, Problem) and not isinstance(op, RedistributeInter): # try to fill in undertermined topologies (experimental feature) backends = op.supported_backends() for ifield, itopo in sorted(ifields.items(), key=lambda x: x[0].name): if itopo is not None: continue # look for ifield usage untill now if ( (ifield in ofields) and (ofields[ifield] is not None) and (ofields[ifield].backend.kind in backends) ): ifields[ifield] = ofields[ifield] elif ifield not in self.topology_states: if outputs_are_inputs: # we can try to push this operator after we're done self._deferred_operators.append((op, opnode)) else: msg = ( "\nGraphBuilder {} could not automatically " "determine the topology of input field {} in " "operator {}.\nTry to set a non empty " "TopologyDescriptor when passing the variable " "parameters, when creating the operator." "\nAutomatic topology detection is an " "experimental feature." ) msg = msg.format(target_node.name, ifield.name, op.name) raise RuntimeError(msg) else: cstate = self.topology_states[ifield] (itopo, dstate, node, ireqs) = cstate.first_topology_and_dstate field_requirements.update_inputs({ifield: ireqs}) if itopo.backend.kind not in backends: backend = itopo.backend.any_backend_from_kind(*backends) itopo = itopo.topology_like(backend=backend) ifields[ifield] = itopo for ofield, otopo in sorted(ofields.items(), key=lambda x: x[0].name): if otopo is not None: continue if (ofield in ifields) and (ifields[ofield] is not None): ofields[ofield] = ifields[ofield] elif ofield not in self.topology_states: msg = ( "\nGraphBuilder {} could not automatically determine " "the topology of input field {} in operator {}." "\nTry to set a non empty TopologyDescriptor when " "passing the variable parameters, when creating the " "operator.\nAutomatic topology detection is an " "experimental feature." ) msg = msg.format(target_node.name, ofield.name, op.name) raise RuntimeError(msg) else: cstate = self.topology_states[ofield] (otopo, dstate, node, oreqs) = cstate.first_topology_and_dstate field_requirements.update_outputs({ofield: oreqs}) ofields[ofield] = otopo # iterate over subgraph operator input parameters if iparams: gprint(" >Input parameters") for iparam in sorted(iparams.keys(), key=lambda x: x.name): gprint(f" *{iparam.short_description()}") parameter_handler.handle_input_parameter(iparam, opnode) if iparam.name not in output_params: input_params[iparam] = iparams[iparam] # iterate over subgraph operator output parameters if oparams: gprint(" >Output parameters") for oparam in sorted(oparams.keys(), key=lambda x: x.name): gprint(f" *{oparam.short_description()}") parameter_handler.handle_output_parameter(oparam, opnode) output_params[oparam] = oparams[oparam] # iterate over subgraph operator input fields input_states = {} if ifields: gprint(" >Input fields") for ifield, itopo in sorted( ifields.items(), key=lambda x: x[0].name, reverse=True ): gprint( " *{}{}".format( ifield.name, ( " on an unknown topology" if (itopo is None) else f".{itopo.pretty_tag}" ), ) ) if itopo is None: assert isinstance(op, RedistributeInter) continue if isinstance(op, Problem): if ifield in op.initial_input_topology_states.keys(): ifreqs = op.initial_input_topology_states[ifield][0] else: ifreqs = None else: if current_level != 0 or isinstance(op, Problem): ifreqs = None else: ifreqs = field_requirements.get_input_requirement(ifield)[1] if ifield not in self.topology_states: cstate = self.new_topology_state(ifield) self.topology_states[ifield] = cstate is_new = True else: cstate = self.topology_states[ifield] is_new = False dstate = cstate.handle_input(opnode, itopo, ifreqs, graph, is_new) input_states[ifield] = dstate if is_new: input_fields[ifield] = itopo input_topology_states[ifield] = (ifreqs, dstate) if ifield not in self._double_check_inputs: self._double_check_inputs[ifield] = {} self._double_check_inputs[ifield].update({itopo: (ifreqs, dstate)}) # iterate over subgraph operator output fields output_states = {} if ofields: gprint(" >Output fields") for ofield, otopo in sorted( ofields.items(), key=lambda x: x[0].name, reverse=True ): gprint( " *{}{}".format( ofield.name, ( " on an unknown topology" if (otopo is None) else f".{otopo.pretty_tag}" ), ) ) if otopo is None: assert isinstance(op, RedistributeInter) continue if isinstance(op, Problem): if ofield in op.final_output_topology_states.keys(): ofreqs = op.final_output_topology_states[ofield][0] else: ofreqs = None else: ofreqs = ( None if (current_level != 0) else field_requirements.get_output_requirement(ofield)[1] ) istates = None if (current_level != 0) else input_states cstate = self.topology_states.setdefault( ofield, self.new_topology_state(ofield) ) invalidate_field = ofield not in op.get_preserved_input_fields() dstate = cstate.handle_output( opnode, otopo, ofreqs, op, istates, invalidate_field, graph, node_list=target_node.nodes, ) output_fields[ofield] = otopo output_states[ofield] = dstate output_topology_states[ofield] = (None, dstate) if (current_level == 0) and ((op, opnode) not in self._deferred_operators): opnode.set_op_info(op, input_states, output_states) op_input_topology_states[op] = input_states op_output_topology_states[op] = output_states def __find_elements_to_redistribute(available_elems, needed_elems): # The algorithm is to extract level0 input fields and topologies as needs # and meet with output fields and topologies as provided. The key feature is that # these informations are distributed across distinct tasks (sub-communicators). # Same algorithm is also used for parameters. domain = first_not_None( [ _.domain for _ in set(available_elems.values()).union( set(needed_elems.values()) ) if hasattr(_, "domain") ] ) comm = domain.parent_comm current_tasks = domain.current_task_list() def _name_to_key(n, d): var = [_ for _ in d.keys() if isinstance(_, str) and _ == n] var += [_ for _ in d.keys() if not isinstance(_, str) and _.name == n] if len(var) == 1: return var[0] return None # Find redistribute candidates available_names = { _.name for _ in available_elems.keys() } - self._intertasks_exchanged needed_names = { _.name for _ in needed_elems.keys() } - self._intertasks_exchanged mgs = " >[IT] Current task ({}) {} parameters and fields : {}" gprint( mgs.format(current_tasks, "can communicate", ", ".join(available_names)) ) gprint(mgs.format(current_tasks, "needs", ", ".join(needed_names))) # Inter-task matching is performed on root process available_names = {_: None for _ in available_names} # value is dest task needed_names = {_: None for _ in needed_names} # value is src task for current_task in current_tasks: if domain.task_root_in_parent(current_task) == domain.parent_rank: msg = "" # loop over other tasks for ot in (_ for _ in domain.all_tasks if _ != current_task): if domain.task_root_in_parent(ot) == domain.parent_rank: ot_needs = [] for _n in needed_names.keys(): _ntopo = needed_elems[_name_to_key(_n, needed_elems)] if ( hasattr(_ntopo, "task_id") and _ntopo.task_id == current_task ): continue else: ot_needs.append(_n) can_provide = [_ for _ in ot_needs if _ in available_names] to_remove = [] for prov in can_provide: ae = available_elems[ _name_to_key(prov, available_elems) ] ne = needed_elems[_name_to_key(prov, needed_elems)] if ae.task_id != ot and ne.task_id == ot: available_names[prov] = ne.task_id needed_names[prov] = ae.task_id else: to_remove.append(prov) for rm in to_remove: can_provide.remove(rm) available_names[rm] = None needed_names[rm] = None else: comm.isend( list(needed_names.keys()), dest=domain.task_root_in_parent(ot), tag=4321, ) ot_needs = comm.recv( source=domain.task_root_in_parent(ot), tag=4321 ) can_provide = [_ for _ in ot_needs if _ in available_names] for prov in can_provide: available_names[prov] = ot ae = available_elems[ _name_to_key(prov, available_elems) ] assert ot != ae.task_id comm.isend( can_provide, dest=domain.task_root_in_parent(ot), tag=1234, ) ot_provide = comm.recv( source=domain.task_root_in_parent(ot), tag=1234 ) for _op in ot_provide: needed_names[_op] = ot ne = needed_elems[_name_to_key(_op, needed_elems)] assert ot != ne.task_id if len(ot_needs) > 0: msg += "\n *Other task {} needs init for {}, we provide {}".format( ot, ot_needs, "nothing" if len(can_provide) == 0 else can_provide, ) if msg != "": gprint(" >[IT] Inter-tasks matching:" + msg) needed_names = {p: t for (p, t) in needed_names.items() if t is not None} available_names = { p: t for (p, t) in available_names.items() if t is not None } for current_task in current_tasks: tcomm = domain.get_task_comm(current_task) needed_names = tcomm.bcast(needed_names, root=0) available_names = tcomm.bcast(available_names, root=0) final_needed_names, final_available_names = {}, {} for current_task in current_tasks: _tmp_needed = dict( (k, v) for k, v in needed_names.items() if v != current_task ) _tmp_avail = dict( (k, v) for k, v in available_names.items() if v != current_task ) final_needed_names.update(_tmp_needed) final_available_names.update(_tmp_avail) needed_names, available_names = final_needed_names, final_available_names gprint( f" >[IT] Inter-tasks will send:to {available_names} and recieve:from {needed_names}" ) # Get back the actual field or parameter names_to_obj = {} for p in available_names.keys(): names_to_obj[p] = _name_to_key(p, available_elems) for p in needed_names.keys(): names_to_obj[p] = _name_to_key(p, needed_elems) # group parameters with same other task allp = [] tasks_to_name = {} for p in sorted( set(available_names.keys()).union(set(needed_names.keys())) ): t = ( available_names[p] if p in available_names.keys() else needed_names[p] ) if isinstance(names_to_obj[p], ScalarField): allp.append( [ p, ] ) else: if t in tasks_to_name: tasks_to_name[t].append(p) else: tasks_to_name[t] = [ p, ] for params in tasks_to_name.values(): allp.append(params) for p in sorted(allp): kwargs = {} s_topo, r_topo, comm_dir = (None,) * 3 var = tuple(names_to_obj[_] for _ in p) if p[0] in available_names: t = available_names[p[0]] topo = available_elems[var[0]] comm_dir = "src" s_topo = topo if p[0] in needed_names: t = needed_names[p[0]] topo = needed_elems[var[0]] comm_dir = "dest" r_topo = topo if not (s_topo is None or r_topo is None): comm_dir = "src&dest" t = None assert not comm_dir is None opname = "RI{}_{}{}{}_{}".format( comm_dir, "" if s_topo is None else s_topo.task_id, "to" if not s_topo is None and not r_topo is None else "", "" if r_topo is None else r_topo.task_id, ",".join(_.name for _ in var), ) # Finalize init call kwargs.update( { "name": opname, "source_topo": s_topo, "target_topo": r_topo, "other_task_id": t, } ) if isinstance(var[0], ScalarField): kwargs.update( { "variable": var[0], "mpi_params": topo.mpi_params, } ) else: kwargs.update( { "parameter": var, "domain": domain, } ) yield kwargs # iterate over ComputationalNodes node_id = 0 for node in [_ for _ in target_node.nodes]: gprint( "\n >Handling node {}::{}: {} {}".format( self.target_node.name, node_id, node.name, node.__class__ ) ) # Recursively build graph. # If current node is a ComputationalGraph, we have to first # build its own local graph and we extract all its operators (graph nodes). # Else if node is a ComputationalGraphOperator, we just take the # current node operator. subgraph, node_ops, node_vertices, from_subgraph = self.build_subgraph( node, current_level ) # iterate over subgraph operators for opvertex, op in zip(node_vertices, node_ops): # add operator node and fill vertex properties opnode = self.new_node( op, subgraph, current_level, node, node_id, opvertex ) if isinstance(node, RedistributeInter): assert self.search_intertasks_ops gprint(f" >[IT] Handling node {node_id}") available_elems, needed_elems = {}, {} for _node in target_node.nodes: if _node is node: break available_elems.update(_node.output_fields) available_elems.update(_node.output_params) for _node in target_node.nodes[::-1]: if _node is node: break needed_elems.update(_node.input_fields) needed_elems.update(_node.input_params) for it_redistribute_kwargs in __find_elements_to_redistribute( available_elems, needed_elems ): if "variable" in it_redistribute_kwargs.keys(): assert RedistributeInter.can_redistribute( *tuple( it_redistribute_kwargs[_] for _ in ( "source_topo", "target_topo", "other_task_id", ) ) ), str(it_redistribute_kwargs) if op.fake_init: op.__init__(**it_redistribute_kwargs) # Recompute fields requirements since no fields were given in first fake operator creation first_op, first_opnode = op, opnode else: if "variable" in it_redistribute_kwargs.keys(): op = RedistributeInter(**it_redistribute_kwargs) else: op = RedistributeInterParam(**it_redistribute_kwargs) target_node.nodes.insert( target_node.nodes.index(first_op), op ) gprint( "\n >Handling node {}::{}: {} {} :: {}".format( self.target_node.name, node_id, op.name, op.__class__, it_redistribute_kwargs, ) ) subgraph, node_ops, node_vertices, from_subgraph = ( self.build_subgraph(op, current_level) ) opvertex = node_vertices[0] opnode = new_vertex(graph, op) if isinstance(op, RedistributeInter): cstate = self.topology_states.setdefault( op.variable, self.new_topology_state(op.variable) ) node = op if isinstance(op, RedistributeInter): op.initialize(topgraph_method=self.target_node.method) op.get_and_set_field_requirements() __handle_node( node_id, node, subgraph, node_ops, node_vertices, from_subgraph, opvertex, op, opnode, ) node_id += 1 if isinstance(op, RedistributeInter) and op.fake_init: # Delete node because nothing has to be exchanged target_node.nodes.remove(op) graph.remove_node(opnode) else: __handle_node( node_id, node, subgraph, node_ops, node_vertices, from_subgraph, opvertex, op, opnode, ) node_id += 1 # On level=0 we print a summary (if asked) for input and output fields and # their topology. def _print_io_fields_params_summary(comment=""): msg = f"\nComputationalGraph {target_node.name} inputs {comment}:\n" if not self.input_fields and not self.input_params: msg += " no inputs\n" else: if self.input_fields: for ifield in sorted(self.input_fields, key=lambda x: x.name): itopo = self.input_fields[ifield] _, ireqs = self.input_topology_states[ifield] msg += " *Field {} on topo {}{}\n".format( ifield.name, itopo.id, f": {ireqs}" if GRAPH_BUILDER_DEBUG_LEVEL == 2 else "", ) if len(self.input_params) > 0: for iparam in sorted(ip.name for ip in self.input_params): msg += f" *Parameter {iparam}\n" msg += f"ComputationalGraph {target_node.name} outputs {comment}:\n" if not self.output_fields and not self.output_params: msg += " no outputs\n" else: if self.output_fields: for ofield in sorted(self.output_fields, key=lambda x: x.name): otopo = self.output_fields[ofield] _, oreqs = self.output_topology_states[ofield] msg += " *Field {} on topo {}{}\n".format( ofield.name, otopo.id, f": {oreqs}" if GRAPH_BUILDER_DEBUG_LEVEL == 2 else "", ) if len(self.output_params) > 0: for oparam in sorted(op.name for op in self.output_params): msg += f" *Parameter {oparam}\n" msg += "\n" gprint(msg) if current_level == 0: _print_io_fields_params_summary() is_graph_updated = False # iterate deferred nodes for op, opnode in self._deferred_operators: gprint(f" >Handling deferred node {op.name}") ifields = op.input_fields input_states = op_input_topology_states[op] output_states = op_output_topology_states[op] field_requirements = op.field_requirements for ifield, itopo in sorted(ifields.items(), key=lambda x: x[0].name): if itopo is not None: continue msg = "\nGraphBuilder {} could not automatically determine the " msg += "topology of input field {} in operator {}." msg += "\nTry to set a non empty TopologyDescriptor when passing " msg += "the variable parameters, when creating the operator." msg += "\nAutomatic topology detection is an experimental feature." msg = msg.format(target_node.name, ifield.name, op.name) if ifield not in self.topology_states: raise RuntimeError(msg) cstate = self.topology_states[ifield] if cstate.first_topology_and_dstate is None: raise RuntimeError(msg) (itopo, dstate, node, ireqs) = cstate.first_topology_and_dstate ifields[ifield] = itopo input_states[ifield] = dstate field_requirements.update_inputs({ifield: ireqs}) cstate.add_edge(graph, opnode, node, ifield, itopo) if current_level == 0: opnode.set_op_info(op, input_states, output_states) # add output field Writer if necessary if target_node._output_fields_to_dump is not None: from hysop.operator.hdf_io import HDF_Writer for fields, io_params, op_kwds in target_node._output_fields_to_dump: if not fields: fields = self.output_fields.keys() fields = tuple(sorted(fields, key=lambda x: x.name)) for field in fields: msg = f"{field.name} is not an output field." assert field in self.output_fields, msg target_topo = self.output_fields[field] variables = {field: target_topo} io_params = io_params.clone( filename=f"{io_params.filename}_{field.name}_out" ) op = HDF_Writer(io_params=io_params, variables=variables, **op_kwds) op.initialize(topgraph_method=self.target_node.method) op.get_and_set_field_requirements() opnode = self.new_node( op, None, current_level, None, None, None, None ) ifreqs = ( None if (current_level != 0) else field_requirements.get_input_requirement(field)[1] ) cstate = self.topology_states[field] state = cstate.handle_input( opnode, target_topo, ifreqs, graph, False ) input_states = {field: state} output_states = {} self.op_input_topology_states[op] = input_states self.op_output_topology_states[op] = output_states if current_level == 0: opnode.set_op_info(op, input_states, output_states) is_graph_updated = True # Alter states such that output topology states match input topology states # this is only done if required (outputs_are_inputs) and if we are # processing the top level (root) graph if (current_level == 0) and outputs_are_inputs: def _closure(field, itopo, itopostate, cstate): target_topo = itopo input_dfield_requirements, input_topology_state = itopostate requirements = input_dfield_requirements.copy() requirements.axes = (input_topology_state.axes,) requirements.memory_order = input_topology_state.memory_order cstate.output_as_input(target_topo, requirements, graph) # Update (on closure) to have output as close as possible to inputs if field in output_fields: orig_topo, orig_state = ( output_fields[field], cstate.discrete_topology_states[output_fields[field]], ) final_topo, final_state = ( target_topo, cstate.discrete_topology_states[target_topo], ) kept_topo_and_state = ( orig_topo == final_topo and orig_state == final_state ) if not kept_topo_and_state: msg = " > Update graph outputs {} from topology {}{} to {}{}".format( field.name, orig_topo.tag, f":{orig_state}" if GRAPH_BUILDER_DEBUG_LEVEL == 2 else "", final_topo.tag, f":{final_state}" if GRAPH_BUILDER_DEBUG_LEVEL == 2 else "", ) gprint(msg) self.output_fields[field] = target_topo self.output_topology_states[field] = ( None, cstate.discrete_topology_states[target_topo], ) return True return False # identify variables that needs a closure redistribute_fields = set(input_fields.keys()) for field in sorted(redistribute_fields, key=lambda x: x.name): assert field in input_topology_states is_graph_updated = _closure( field, input_fields[field], input_topology_states[field], self.topology_states[field], ) for f in sorted(self._double_check_inputs.keys(), key=lambda x: x.name): # all field used as input must have been written in each topology or never written written_topos = set(self.topology_states[f].write_nodes.keys()) read_topos = set(self._double_check_inputs[f].keys()) diff = read_topos - written_topos if len(written_topos) > 0 and len(diff) >= 0: for t in diff: if ( f in self.output_fields and self.output_fields[f].mpi_params.task_id != t.mpi_params.task_id ): print( "WARNING FOR MIXED-TASK DOUBLE CHECK for", f.name, (self.output_fields[f], t), ) else: assert f in input_topology_states is_graph_updated = _closure( f, t, self._double_check_inputs[f][t], self.topology_states[f], ) # Final intertask redistributes as closure if self.search_intertasks_ops and (current_level == 0) and outputs_are_inputs: gprint(" >[IT] Inter-task closure searching") available_elems, needed_elems = {}, {} needed_elems.update(self.input_fields) available_elems.update(self.output_fields) needed_elems.update(self.input_params) available_elems.update(self.output_params) # Find redistribute candidates for it_redistribute_kwargs in __find_elements_to_redistribute( available_elems, needed_elems ): if it_redistribute_kwargs: if "variable" in it_redistribute_kwargs.keys(): node = RedistributeInter(**it_redistribute_kwargs) else: node = RedistributeInterParam(**it_redistribute_kwargs) node_id = len(target_node.nodes) target_node.push_nodes(node) if isinstance(node, RedistributeInter): node.initialize() gprint( " >Handling node {}: {} {}".format( node_id, node.name, node.__class__ ) ) subgraph, node_ops, node_vertices, from_subgraph = ( self.build_subgraph(node, current_level) ) for opvertex, op in zip(node_vertices, node_ops): opnode = self.new_node( op, subgraph, current_level, node, node_id, opvertex ) __handle_node( node_id, node, subgraph, node_ops, node_vertices, from_subgraph, opvertex, op, opnode, ) is_graph_updated = True if current_level == 0 and is_graph_updated: _print_io_fields_params_summary("After closure and output dumping") # Check that the generated graph is a directed acyclic graph if not is_directed_acyclic_graph(graph): msg = "\nGenerated operator graph is not acyclic." raise RuntimeError(msg) # Transitive reduction of graph # This removes parallel and unnecessary transitive edges # ie. remove useless redondant dependencies reduced_graph = transitive_reduction(graph) # Lexicographical topological sort # => find out operator order for execution purposes # => have to be exactly the same on each MPI process. sorted_nodes = lexicographical_topological_sort(reduced_graph) for i, node in enumerate(sorted_nodes): node.op_ordering = i # Command queues (each color represents a command queue) # ie. try to find out data independent subgraphs color = 0 queues = {} for node in sorted_nodes: if node.command_queue is not None: continue nodes = [node] uncolored_childs = tuple( filter(lambda n: n.command_queue is None, reduced_graph.adj[node]) ) while len(uncolored_childs) > 0: vid = np.argmin([n.op_ordering for n in uncolored_childs]) node = uncolored_childs[vid] nodes.append(node) uncolored_childs = tuple( filter(lambda n: n.command_queue is None, reduced_graph.adj[node]) ) idx_range = (nodes[0].op_ordering, nodes[-1].op_ordering) if queues: qkeys = tuple(sorted(queues.keys())) color = qkeys[-1] + 1 for k in qkeys[::-1]: paths = queues[k] if paths[-1][1] < idx_range[0]: src = sorted_nodes[paths[-1][1]] dst = sorted_nodes[idx_range[0]] all_paths = all_simple_paths(reduced_graph, src, dst) if len(all_paths) > 0: color = k break queues.setdefault(color, []).append(idx_range) for node in nodes: node.command_queue = color self.reduced_graph = reduced_graph self.sorted_nodes = sorted_nodes self.nodes = tuple(map(lambda x: x.operator, sorted_nodes))
[docs] def build_subgraph(self, node, current_level, **kwds): node_ops = [] node_vertices = [] subgraph = None from_subgraph = False if isinstance(node, RedistributeInter): node_operators = node.operators() node_ops.extend(node_operators) node_vertices += [None] * len(node_operators) elif isinstance(node, RedistributeInterParam): node_ops.extend( [ node, ] ) node_vertices += [ None, ] elif node.mpi_params is None or node.mpi_params.on_task: if isinstance(node, Problem): node._build_graph( current_level=current_level, outputs_are_inputs=True, search_intertasks_ops=node.search_intertasks_ops, **kwds, ) assert node.graph_built, "Sub-problem should be already built" assert node.initialized, "Sub-problem should be already initialized" node_ops.append(node) node_vertices.append(None) elif isinstance(node, ComputationalGraph): node._build_graph(current_level=current_level + 1, **kwds) node_ordering = node.sorted_nodes subgraph = node.reduced_graph from_subgraph = True for nid in node_ordering: _node = nid op = _node.operator node_vertices.append(_node) node_ops.append(op) elif isinstance(node, ComputationalGraphOperator): node_operators = node.operators() node_ops.extend(node_operators) node_vertices += [None] * len(node_operators) else: msg = "Unknown node type {}." raise NotImplementedError(msg.format(node.__class__.__name__)) return subgraph, node_ops, node_vertices, from_subgraph
class __ParameterHandler: def __init__(self, graph): self.graph = graph self.last_write_node = {} self.reading_nodes = {} def add_edge(self, src_node, dst_node, parameter): if ( (src_node is not None) and (dst_node is not None) and (src_node != dst_node) ): edge = new_edge(self.graph, src_node, dst_node, parameter) return edge else: return None def handle_input_parameter(self, parameter, opnode): check_instance(parameter, Parameter) last_write_node = self.last_write_node.setdefault(parameter, None) reading_nodes = self.reading_nodes.setdefault(parameter, []) # add read dependency to last written node before current op # (inputs are modified before actual call to the operator) if last_write_node: self.add_edge(last_write_node, opnode, parameter) reading_nodes.append(opnode) def handle_output_parameter(self, parameter, opnode): check_instance(parameter, Parameter) last_write_node = self.last_write_node.setdefault(parameter, None) reading_nodes = self.reading_nodes.setdefault(parameter, []) if last_write_node: self.add_edge(last_write_node, opnode, parameter) for rn in reading_nodes: self.add_edge(rn, opnode, parameter) reading_nodes[:] = [] self.last_write_node[parameter] = opnode class __ContinuousFieldState: def __init__( self, field, op_input_topology_states, op_output_topology_states, input_fields_to_dump, topgraph_method, ): # all states are related to this continuous field self.field = field # record input and output topology states when creating auto generating # new operators self.op_input_topology_states = op_input_topology_states self.op_output_topology_states = op_output_topology_states self.first_topology_and_dstate = None self.dump_ifield = None if input_fields_to_dump is not None: for fields, io_params, op_kwds in input_fields_to_dump: if (not fields) or (field in fields): io_params = io_params.clone( filename=f"{io_params.filename}_{field.name}_in" ) self.dump_ifield = (io_params, op_kwds) break # dictionnary (topology -> list of node) that are up to date (lastly written) # multiple fields can be up to date at the same time after a redistribute # operator or after an operator that implements the # get_preserved_input_fields method. self.write_nodes = {} # dictionnary (topology -> list of nodes) that are currently reading # field:topo self.read_nodes = {} # dictionnary (topology -> TopologyState) self.discrete_topology_states = {} self.method = topgraph_method def add_vertex(self, graph, operator): return new_vertex(graph, operator) def add_edge(self, graph, src_node, dst_node, field, topology, reverse=False): if ( (src_node is not None) and (dst_node is not None) and (src_node != dst_node) ): if reverse: return new_edge(graph, dst_node, src_node, field, topology) else: return new_edge(graph, src_node, dst_node, field, topology) else: return None def push_generated_operators( self, op_generator, op_name_prefix, src_topo, graph ): field = self.field read_nodes = self.read_nodes write_nodes = self.write_nodes dstates = self.discrete_topology_states op_input_topology_states = self.op_input_topology_states op_output_topology_states = self.op_output_topology_states assert op_generator.nodes for i, op in enumerate(op_generator.nodes): op.name = f"{op_name_prefix}_{op.name}" if len(op_generator.nodes) > 1: op.name += f"__{i}" op.initialize(topgraph_method=self.method) assert len(op.input_fields) == 1 assert len(op.output_fields) == 1 assert next(iter(op.input_fields)) == field assert next(iter(op.output_fields)) == field assert next(iter(op.input_fields.values())) == src_topo dst_topo = next(iter(op.output_fields.values())) op_node = self.add_vertex(graph, op) # handle input if src_topo in write_nodes: src_node = write_nodes[src_topo] self.add_edge(graph, src_node, op_node, field, src_topo) # handle output ro_nodes = read_nodes.setdefault(dst_topo, []) for ro_node in ro_nodes: self.add_edge(graph, ro_node, op_node, field, dst_topo) read_nodes[dst_topo] = [] write_nodes[dst_topo] = op_node if dst_topo is not src_topo: read_nodes.setdefault(src_topo, []).append(op_node) istate = {field: dstates[src_topo]} dstates[dst_topo] = op.output_topology_state(field, istate) ostate = {field: dstates[dst_topo]} assert op not in op_input_topology_states assert op not in op_output_topology_states op_input_topology_states[op] = istate op_output_topology_states[op] = ostate op_node.set_op_info(op, istate, ostate) src_node = op_node src_topo = dst_topo return dst_topo def redistribute(self, target_topo, graph, src_topo=None): field = self.field write_nodes = self.write_nodes dstates = self.discrete_topology_states # field has never been written if not write_nodes: return src_topos = write_nodes.keys() if src_topo is not None: assert src_topo in src_topos src_topos = (src_topo,) if target_topo in src_topos: # topology is already up to date with lastest write, nothing to do return msg0 = "field {} from up to date topology:" msg0 += "\n |-{}\n to topology\n |>{}" msg0 = msg0.format( field.name, "\n |-".join(t.short_description() for t in src_topos), target_topo.short_description(), ) gprint(f" >Redistribute {msg0}") # field is out of date on target topology, we should redistribute data # from another topology try: redistribute_generator = Redistribute( variables=field, source_topos=src_topos, target_topo=target_topo ) redistribute_generator.generate() except RedistributeNotImplementedError: msg = "FATAL ERROR: Graph builder could not find suitable operator on " msg += "backend {} to redistribute {}" msg = msg.format(src_topo.backend.kind, msg0) print(f"\n{msg}\n") raise src_topo = redistribute_generator.nodes[0].source_topo assert src_topo in src_topos dst_topo = self.push_generated_operators( redistribute_generator, "R", src_topo, graph ) assert dst_topo == target_topo def transpose(self, topo, target_axes, graph): field = self.field write_nodes = self.write_nodes dstates = self.discrete_topology_states assert topo in dstates src_state = dstates[topo] if not target_axes: return if src_state.axes in target_axes: return msg = ( " >Transpose from state {} to any of those transposition states " ) msg += "[{},] " msg = msg.format( src_state.tstate, ", ".join( [ str(TranspositionState.axes_to_tstate(axes)) for axes in target_axes ] ), ) gprint(msg) def find_permutation(src_axes, dst_axes): axes = () for ai in dst_axes: aj = src_axes.index(ai) axes += (aj,) return axes candidate_axes = () for target in target_axes: axes = find_permutation(src_state.axes, target) candidate_axes += (axes,) try: transpose_generator = Transpose( fields=field, variables={field: topo}, axes=candidate_axes ) transpose_generator.generate() except TranspositionNotImplementedError: msg = "FATAL ERROR: Graph builder could not find suitable operator on " msg += "backend {} to transpose from state {} to any of those " msg += "transposition states [{},] for field {} on topology id {}." msg = msg.format( topo.backend.kind, src_state.tstate, ", ".join( [ TranspositionState.axes_to_tstate(axes) for axes in target_axes ] ), field.name, topo.id, ) print(f"\n{msg}\n") raise dst_topo = self.push_generated_operators( transpose_generator, "T", topo, graph ) assert dst_topo == topo def reorder(self, topo, target_memory_order, graph): field = self.field write_nodes = self.write_nodes dstates = self.discrete_topology_states assert topo in dstates src_state = dstates[topo] assert src_state.memory_order is not MemoryOrdering.ANY if src_state.memory_order == target_memory_order: return if target_memory_order is MemoryOrdering.ANY: return msg = " >MemoryReordering from memory order {} to memory order {}." msg = msg.format(src_state.memory_order, target_memory_order) gprint(msg) try: reorder_generator = MemoryReordering( fields=field, variables={field: topo}, target_memory_order=target_memory_order, ) reorder_generator.generate() except MemoryReorderingNotImplementedError: msg = "FATAL ERROR: Graph builder could not find suitable operator on " msg += "backend {} to reorder a field from order {} to order {} " msg += "for field {} on topology id {}." msg = msg.format( topo.backend.kind, src_state.memory_order, target_memory_order, field.name, topo.id, ) print(f"\n{msg}\n") raise dst_topo = self.push_generated_operators( reorder_generator, "MR", topo, graph ) assert dst_topo == topo def handle_input( self, opnode, target_topo, target_dfield_requirements, graph, is_new ): ifield = self.field write_nodes = self.write_nodes read_nodes = self.read_nodes dtopology_states = self.discrete_topology_states is_root = target_dfield_requirements is not None dim = target_topo.domain.dim tid = target_topo.task_id check_instance( target_dfield_requirements, DiscreteFieldRequirements, allow_none=True ) assert (not is_root) or (target_dfield_requirements.field == ifield) # if this is a new input for the graph, should we create a dumper ? dump_input = False dump_input = is_new and (self.dump_ifield is not None) if dump_input: from hysop.operator.hdf_io import HDF_Writer io_params, op_kwds = self.dump_ifield variables = {ifield: target_topo} writer_op = HDF_Writer( io_params=io_params, variables=variables, **op_kwds ) writer_op.initialize(topgraph_method=self.method) writer_op.get_and_set_field_requirements() writer_opnode = self.add_vertex(graph, writer_op) self.add_edge(graph, writer_opnode, opnode, ifield, target_topo) # we only handle input field requirements when we are root graph # ie. target_dfield_requirements is None # We do this because overall graph inputs will determine initial states # and the start topologies for each variables if is_root: # check if the field has ever been written if not write_nodes: # has it already been read ? if not read_nodes: # adapt to this first operator assert target_topo not in dtopology_states istate = dtopology_states.setdefault( target_topo, CartesianTopologyState(dim, tid) ) if target_dfield_requirements: allowed_axes = target_dfield_requirements.axes default_axes = TranspositionState[dim].default_axes() if (allowed_axes is None) or (default_axes in allowed_axes): istate.axes = default_axes else: istate.axes = allowed_axes[0] allowed_memory_order = ( target_dfield_requirements.memory_order ) default_memory_order = self.discrete_topology_states[ target_topo ].memory_order assert default_memory_order is not MemoryOrdering.ANY if allowed_memory_order is MemoryOrdering.ANY: istate.memory_order = default_memory_order else: istate.memory_order = allowed_memory_order gprint2(f" >Initial state set to {istate}") else: istate = dtopology_states.setdefault( target_topo, CartesianTopologyState(dim, tid) ) gprint2(f" >Input state is {istate}") target_axes = target_dfield_requirements.axes target_memory_order = target_dfield_requirements.memory_order def topology_affinity(candidate_topo): candidate_state = self.discrete_topology_states[candidate_topo] # discard out-of taks topos score = (candidate_topo.task_id != target_topo.task_id) * -10000000 # skip redistribute score += (candidate_topo is target_topo) * 1000000 # skip multiresolution filter (not automatically handled yet) score += ( candidate_topo.grid_resolution == target_topo.grid_resolution ).all() * 100000 # skip transpose score += ( (target_axes is not None) and (candidate_state.axes in target_axes) ) * 10000 # better bandwidth score += (candidate_topo.backend is target_topo.backend) * 1000 # better bandwidth score += ( candidate_topo.backend.kind is target_topo.backend.kind ) * 100 # memory reordering is a noop score += ( (target_memory_order is not MemoryOrdering.ANY) and (candidate_state.memory_order is target_memory_order) ) * 1 # penalize number of ghosts score -= np.prod(candidate_topo.ghosts) return score if (target_topo.backend.kind is Backend.HOST) and write_nodes: # give source topo priority according to topology_affinity src_topos = write_nodes.keys() src_topos = tuple( sorted(src_topos, key=topology_affinity, reverse=True) ) src_topo = src_topos[0] if target_topo.mpi_params.task_id != src_topo.mpi_params.task_id: dtopology_states[target_topo] = src_topo.topology_state else: if src_topo is not target_topo: msg = " >Redistributing field {} from up to date topologies {} " msg += "to host topology {}." msg = msg.format( ifield.name, " ,".join(t.pretty_tag for t in src_topos), target_topo.pretty_tag, ) gprint(msg) self.transpose(src_topo, target_axes, graph) self.redistribute(target_topo, graph, src_topo=src_topo) # we can always reorder target because this a host topology self.reorder(target_topo, target_memory_order, graph) elif (target_topo.backend.kind is Backend.OPENCL) and write_nodes: # give source topo priority according to topology_affinity src_topos = write_nodes.keys() src_topos = tuple( sorted(src_topos, key=topology_affinity, reverse=True) ) src_topo = src_topos[0] if target_topo.mpi_params.task_id != src_topo.mpi_params.task_id: dtopology_states[target_topo] = src_topo.topology_state else: if src_topo is not target_topo: msg = " >Redistributing field {} from up to date topologies {} " msg += "to device topology {}." msg = msg.format( ifield.name, " ,".join(t.pretty_tag for t in src_topos), target_topo.pretty_tag, ) gprint(msg) self.reorder(src_topo, target_memory_order, graph) self.redistribute(target_topo, graph, src_topo=src_topo) # target is always opencl so we transpose here self.transpose(target_topo, target_axes, graph) else: self.transpose(target_topo, target_axes, graph) self.reorder(target_topo, target_memory_order, graph) istate = dtopology_states[target_topo] gprint2(f" >Input state is now {istate}") else: istate = None if dump_input: input_states = {ifield: istate} output_states = {} self.op_input_topology_states[writer_op] = input_states self.op_output_topology_states[writer_op] = output_states writer_opnode.set_op_info(writer_op, input_states, output_states) # add read dependency to last written node before current op # (so that inputs are modified before actual call to the operator) if target_topo in write_nodes: last_write_node = write_nodes[target_topo] self.add_edge(graph, last_write_node, opnode, ifield, target_topo) elif (not is_root) and write_nodes: for node in self.write_nodes.values(): self.add_edge(graph, node, opnode, ifield, target_topo) read_nodes.setdefault(target_topo, []).append(opnode) if is_new: self.first_topology_and_dstate = ( target_topo, istate, opnode, target_dfield_requirements, ) return istate def handle_output( self, opnode, output_topo, oreqs, operator, input_topology_states, invalidate_field, graph, node_list=[], ): ofield = self.field write_nodes = self.write_nodes read_nodes = self.read_nodes dtopology_states = self.discrete_topology_states is_root = input_topology_states is not None # add dependency to last node written to prevent # concurent write-writes. if output_topo in write_nodes: src = write_nodes[output_topo] check_reverse = src.operator in node_list and operator in node_list self.add_edge( graph, src, opnode, ofield, output_topo, reverse=check_reverse and node_list.index(src.operator) > node_list.index(operator), ) if invalidate_field: msg = " >Invalidating output field {} on all topologies but {} " msg += "because is has been freshly written." msg = msg.format(ofield.name, output_topo.pretty_tag) gprint(msg) # add dependency to all operators that reads this field # to prevent concurent read-writes. if output_topo in read_nodes: for ro_node in read_nodes[output_topo]: if not ro_node is None: check_reverse = ( ro_node.operator in node_list and operator in node_list ) self.add_edge( graph, ro_node, opnode, ofield, output_topo, reverse=check_reverse and node_list.index(ro_node.operator) > node_list.index(operator), ) else: self.add_edge(graph, ro_node, opnode, ofield, output_topo) # remove read/write dependencies and states write_nodes.clear() dtopology_states.clear() else: msg = ( " >Keeping output field {} up to date on all topologies because " ) msg += "is has been marked as preserved by operator." msg = msg.format(ofield.name) gprint(msg) msg = " >Up to date topologies for field {} are now {}, {}." msg = msg.format( ofield.name, output_topo.pretty_tag, " ,".join(t.pretty_tag for t in write_nodes), ) gprint(msg) # add the operator node as the one that lastly wrote this field. # no other operators can be reading as this topology just been written. read_nodes[output_topo] = [] write_nodes[output_topo] = opnode if is_root: if isinstance(operator, Problem): ostate = operator.final_output_topology_states[ofield][1] else: ostate = operator.output_topology_state( ofield, input_topology_states ) dtopology_states[output_topo] = ostate gprint2(f" >Output state is now {ostate}") else: ostate = None if self.first_topology_and_dstate is None: self.first_topology_and_dstate = (output_topo, ostate, opnode, oreqs) return ostate def output_as_input(self, target_topo, dstate, graph): self.handle_input(None, target_topo, dstate, graph, False)